import datetime

from kafka import KafkaProducer
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent
)

MYSQL_SETTINGS = {
    "host": "localhost",
    "user": "root",
    "password": "12345678"
}
import json
import sys

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=1, blocking=True, only_schemas=["flask01"],
                            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])


class DateEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime("%Y-%m-%d %H:%M:%S")
        else:
            return json.JSONEncoder.default(self, obj)


for binlogevent in stream:
    for row in binlogevent.rows:
        event = {"schema": binlogevent.schema, "table": binlogevent.table}
        if isinstance(binlogevent, DeleteRowsEvent):
            event["action"] = "delete"
            event["data"] = row["values"]
        elif isinstance(binlogevent, WriteRowsEvent):
            event["action"] = "insert"
            event["data"] = row["values"]
        elif isinstance(binlogevent, UpdateRowsEvent):
            event["action"] = "update"
            event["before_values"] = dict(row["before_values"].items())
            event["after_values"] = dict(row["after_values"].items())
        json_response = json.dumps(event, ensure_ascii=False, cls=DateEncoder).encode()
        print(json_response)
        producer.send("message", json_response)
        sys.stdout.flush()
stream.close()
